Filtres et espace de représentation des réseaux de neurones ☕️☕️¶
Objectifs de la séquence
Être sensibilisé :
principe d’espace de représentation en deep learning.
Être capable de :
de visualiser les convolutions d’un réseau de neurones,
de visualiser l’espace de représentation d’un réseau de neurones,
de manipuler les librairies \(\texttt{pytorch}\), \(\texttt{plotly}\), \(\texttt{umap}\) et \(\texttt{bokeh}\).
Les exercices de cette session seront réalisés deux fois. Une fois sur le jeu de données CIFAR10 et une fois sur un jeu de données représentant les personnages des Simpsons.
Imports¶
# Pour Google Colaboratory
# Décommenter la ligne suivante
# !pip install umap-learn
# monter le jeu de données simpsons en local
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.optim.lr_scheduler import MultiStepLR
from torchvision import datasets, models
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torch.utils.data.sampler import SubsetRandomSampler
import umap
import plotly.express as px
import numpy as np
import copy
Configuration¶
import os
# Each itereates over the dataloader define with batch size will get a batch of batch size samples
# After the iterator has gone through every data sample (one epoch), then we shuffle the order and we go again.
batch_size = 128
num_workers = 2
I. Construction du jeu de données CIFAR10¶
Le \(\texttt{dataset}\) est une sorte de tableau qui contient les éléments de notre jeu de données. Le \(\texttt{dataloader}\) est l’objet qui nous permettra d’accéder à nos données via des batchs aléatoires. Rappelons que le calcul du gradient se fait sur les données. Cependant avec des fonctions aussi complexes qu’un réseau de neurones et avec des jeu de données aussi gros, il devient nécessaire de n’estimer le gradient que sur une partie de ces données.
L’objet \(\texttt{transform}\) permettra de normaliser les données qui seront données à notre modèle. En \(\texttt{pytorch}\), les données sont gérées par un data loader. En effet, on ne traite que très rarement tout le jeu de données d’un coup. On estime plutôt le gradient via un batch de données. De meilleurs résultats sont généralement observés lorsque le jeu de données est mélangé entre chaque itération d’optimisation.
Les parties qui commencent par un [•] Méthode de…. sont celles qu’il faudra réutiliser plus tard (plusieurs fois).
[•] Méthode de split¶
def split_dataset(dataset, valid_size = 0.0, random_state=42):
dataset_size = len(dataset)
indices = list(range(dataset_size))
split = int(np.floor(valid_size * dataset_size))
np.random.seed(random_state)
np.random.shuffle(indices)
train_idx, valid_idx = indices[split:], indices[:split]
train_sampler = SubsetRandomSampler(train_idx)
valid_sampler = SubsetRandomSampler(valid_idx)
return dataset, copy.deepcopy(dataset), train_sampler, valid_sampler
Construction du jeu de données¶
# label names
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
transform = transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
]
)
#root_directory where images are.
trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainset, validset, train_sampler, valid_sampler = split_dataset(trainset, valid_size = 0.2, random_state=None)
trainloader = DataLoader(
trainset, batch_size=batch_size, sampler=train_sampler,
num_workers=num_workers,
)
validloader = DataLoader(
validset, batch_size=batch_size, sampler=valid_sampler,
num_workers=num_workers,
)
print(len(trainloader), len(validloader))
testset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = DataLoader(
testset, batch_size=batch_size, shuffle=True,
num_workers=num_workers,
)
print('Nb test batchs:', len(testloader))
Files already downloaded and verified
313 79
Files already downloaded and verified
Nb test batchs: 79
Visualisation des données¶
[•] Méthode de visualisation¶
#### Visualisation d'images du jeu de données
import matplotlib.pyplot as plt
import numpy as np
def imshow(images, labels, predicted=None):
plt.figure(figsize=(15, 10))
for idx in range(8):
plt.subplot(2, 4, idx+1)
plt.axis('off')
img = (images[idx] * 0.224 + 0.456)#/ 2 + 0.5 # unnormalize
npimg = img.numpy()
plt.axis('off')
plt.imshow(np.transpose(npimg, (1, 2, 0)))
title = str(classes[labels[idx]]) + \
('' if predicted is None else ' - ' + str(classes[predicted[idx]]))
plt.title(title)
plt.show()
Visualisation¶
# get some random training images
dataiter = iter(testloader)
images, labels = dataiter.next()
# show images
imshow(images[:8], labels[:8])
II. Construction du modèle¶
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 16, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(16, 32, 5)
self.fc1 = nn.Linear(32 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 32 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc(x)
return x
model = Net()
# model = model.cuda()
III. Visualisation des filtres/paramètres du modèle à l’initialisation¶
[•] Méthode de visualisation des filtres¶
def visualize_filters(tensor, ch=0, allkernels=False, nrow=8, padding=1):
n,c,w,h = tensor.shape
if allkernels: tensor = tensor.view(n*c, -1, w, h)
elif c != 3: tensor = tensor[:,ch,:,:].unsqueeze(dim=1)
rows = np.min((tensor.shape[0] // nrow + 1, 64))
grid = torchvision.utils.make_grid(tensor, nrow=nrow, normalize=True, padding=padding)
plt.figure( figsize=(nrow,rows) )
plt.imshow(grid.numpy().transpose((1, 2, 0)))
plt.axis('off')
plt.ioff()
plt.show()
Visualisation des filtres¶
filters = model.conv1.weight.data.clone().cpu()
visualize_filters(filters, ch=0, allkernels=False)
Exercice
Que pouvez-vous dire de ces filtres ?
IV. L’apprentissage¶
Fonction objectif, scheduler et optimizer¶
#Choose the loss function
criterion = nn.CrossEntropyLoss()
#Optimizer
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=0.0005)
scheduler = MultiStepLR(optimizer, milestones=[25, 50], gamma=0.1)
[•] Méthodes d’évaluation¶
import cv2
# Let us code a generic prediction function
def predict(model, loader, criterion=nn.CrossEntropyLoss(), feature_extract=False, max_size=0, resize=128):
with torch.no_grad():
if not feature_extract:
model.eval()
y_preds = []
y_labels = []
inputs_ = []
running_loss = 0.0
size = 0.0
for idx, data in enumerate(loader):
inputs, labels = data
# inputs = inputs.cuda()
# labels = labels.cuda()
# wrap them in Variable
outputs = model(inputs)
loss = criterion(outputs, labels)
running_loss += loss.item()
y_preds.extend(outputs.data.tolist())
y_labels.extend(labels.data.tolist())
if size <= max_size and feature_extract:
images = [
cv2.resize(
(
(i*0.224+0.456)*255).astype('uint8').transpose((1, 2, 0)), dsize=(resize, resize)
) for i in inputs.data.cpu().numpy()
]
inputs_.extend(images)
size = len(inputs_)
predictions, labels, inputs = np.asarray(y_preds), np.asarray(y_labels), np.asarray(inputs_)
if not feature_extract:
return predictions, labels, running_loss/len(loader)
return predictions, labels, inputs
def accuracy_topk(predictions, labels, top_k=1):
res = 0
for i, pred in enumerate(predictions):
answer = np.argsort(-pred)[0:top_k]
if labels[i] in answer:
res += 1
acc = float(res) / float(labels.shape[0])
return acc
def evaluate(loader, model, top_k = 1, criterion = nn.CrossEntropyLoss()):
predictions, labels, loss = predict(model, loader, criterion)
return accuracy_topk(predictions, labels, top_k = top_k), loss
[•] Méthode d’apprentissage (i.e. d’optimisation)¶
Exercice
Proposez le code en utilisant une fonction (pour pouvoir réutiliser le code plus tard) permettant d’optimiser votre réseau pendant deux epochs.
Attention, votre code doit renvoyer 4 tableaux : l’historique de la loss de train, l’historique la loss de validation, l’historique de l’accuracy de train et de l’accuracy de test.
####### Complete this part ######## or die ####################
def train(model, criterion, optimizer, scheduler, n_epoch=2):
loss_history = []
valid_loss_history = []
acc_history = []
val_acc_history = []
for epoch in range(n_epoch): # loop over the dataset multiple times
...
...
print('**** Finished Training ****')
return loss_history, valid_loss_history, acc_history, val_acc_history
###############################################################
L’entraînement¶
eval_frequency=1
loss_history, \
valid_loss_history, \
acc_history, \
val_acc_history = train(model, criterion, optimizer, scheduler, n_epoch=2)
Affichage des courbes de loss et de précision¶
[•] Méthode d’affichage des courbes¶
def plot_loss(loss_history, valid_loss_history, acc_history, val_acc_history):
plt.figure()
plt.plot([i*eval_frequency for i in range(1, len(loss_history)+1)], loss_history,
label='Train loss')
plt.plot([i*eval_frequency for i in range(1, len(loss_history)+1)], valid_loss_history,
label='Validation loss')
plt.legend()
plt.show()
plt.figure()
plt.plot([i*eval_frequency for i in range(1, len(acc_history)+1)], acc_history,
label='Train Accuracy')
plt.plot([i*eval_frequency for i in range(1, len(acc_history)+1)], val_acc_history,
label='Validation Accuracy')
plt.legend()
plt.show()
Affichage des courbes¶
plot_loss(loss_history, valid_loss_history, acc_history, val_acc_history)
Question
Que pouvez-vous conclure en regardant la loss et l’accuracy ?
Sauvegarde et chargement du modèle¶
En deep learning l’apprentissage d’un modèle peut prendre énormément de temps. Pensez à toujours sauvegarder votre modèle régulièrement afin de ne pas le perdre. (Attention, il faut parfois aussi sauvegarder les variables liées à l’optimiseur lui-même…)
Sauvegarde¶
torch.save(model.state_dict(), 'my_model.torch')
Chargement¶
model = Net()
model.load_state_dict(torch.load('my_model.torch'))
# model = model.cuda()
<All keys matched successfully>
V. Visualisation des filtres/paramètres appris¶
filters = model.conv1.weight.data.clone().cpu()
visualize_filters(filters, ch=0, allkernels=False)
Question
Que conclure en regardant ces filtres relativement aux filtres avant l’entraînement du modèle ?
VI. Évaluasion du modèle sur l’ensemble de test¶
accuracy, _ = evaluate(testloader, model)
print('Test accuracy: %.3f' % (accuracy), end='\n')
/Users/maximilienservajean/.miniforge3/lib/python3.9/site-packages/torch/nn/functional.py:718: UserWarning: Named tensors and all their associated APIs are an experimental feature and subject to change. Please do not use them for anything important until they are released as stable. (Triggered internally at /tmp/pip-req-build-gqmopi53/c10/core/TensorImpl.h:1156.)
return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)
Test accuracy: 0.098
Question
Que dire de l’accuracy ? Quel est le score attendu d’un modèle aléatoire ?
VII. Test des prédictions sur quelques images¶
#Test prediction on some images
dataiter = iter(testloader)
images, labels = dataiter.next()
outputs = model(images[:8])# .to(device)) # we use the loaded model
_, predicted = torch.max(outputs, 1)
imshow(images[:8], labels[:8], predicted[:8])
Tester son modèle sur quelques images peut être intéressant lorsqu’il s’agit de comprendre le type d’erreurs qui sont faites. Ça ne peut JAMAIS être un argument suffisant pour dire que le modèle “marche” !
VIII. Extraction des features et Dataviz¶
Rappelons nous qu’une réseau de neurones est la composition d’une première fonction \(\phi:\mathbb{R}^p\mapsto\mathbb{R}^f\) qui apprend un feature space et d’un classifieur linéaire \(\psi:\mathbb{R}^f\mapsto\mathbb{R}^C\) qui retourne un score pour chacune des classes de notre problèmes à \(C\) classes. Il est intéressant d’étudier la manière dont la fonction \(\phi\) a déformé l’espace d’entrée en regroupant certaines images entre elles, etc. Notons que la fonction \(\phi\) est elle-même une composition et qu’il est possible d’étudier les sorties des différentes couches.
Visualiser la sortie de la fonction \(\phi\) n’est pas directement possible puisqe l’espace possède \(f\) dimensions et que \(f\) est généralement très loin devant \(2\) et \(3\). Il convient donc d’utiliser un algorithme de réduction de dimension. Ces algorithmes fonctionnent très bien sur la sortie de la fonction \(\phi\) car la “dimension effective” de nos données s’y retrouvent très réduites : les images similaires se retrouvent très proches les une des autres et très différentes des autres images, etc.
print(model)
Net(
(conv1): Conv2d(3, 16, kernel_size=(5, 5), stride=(1, 1))
(pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
(conv2): Conv2d(16, 32, kernel_size=(5, 5), stride=(1, 1))
(fc1): Linear(in_features=800, out_features=120, bias=True)
(fc2): Linear(in_features=120, out_features=84, bias=True)
(fc): Linear(in_features=84, out_features=10, bias=True)
)
Dans un modèle \(\texttt{Pytorch}\), la coutume est d’appeler \(\texttt{fc}\) la fonction \(\psi\). Si nous souhaitons récupérer la sortie de la fonction \(\phi\) il suffit de remplacer \(\texttt{fc}\) par la fonction identité. C’est ce que nous faisons maintenant.
# Replacing the classification lyaer by an identify function forward the feature space to the end
# We now may forward and get the features as output of the model
model.fc = nn.Identity()
print(model)
Net(
(conv1): Conv2d(3, 16, kernel_size=(5, 5), stride=(1, 1))
(pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
(conv2): Conv2d(16, 32, kernel_size=(5, 5), stride=(1, 1))
(fc1): Linear(in_features=800, out_features=120, bias=True)
(fc2): Linear(in_features=120, out_features=84, bias=True)
(fc): Identity()
)
Reduction de dimensionalité: PCA, t-SNE, UMAP, etc ….¶
L’idée est ici d’apprendre à partir de l’espace de caractéristique (i.e. de représentation) du modèle, un projecteur qui va faire passer d’un espace de dimension \(f\) à un espace de dimension \(2\) qu’on va pouvoir visualiser sur un graphe. Vous connaissez déjà un certains nombre d’algorithmes de ce type (PCA, t-SNE, etc.). Nous utiliserons ici UMAP qui on pour objectif d’apprendre une fonction \(map : x \rightarrow map(x) : \mathbb{R}^k \rightarrow \mathbb{R}^2\) de sorte à ce que les vecteurs voisins au sens d’une norme (e.g. distance euclidienne \(L_2\)) soient voisin au sens de la norme euclidienne dans l’espace de basse dimension.
#Extract feature vectors:
features, labels, images = predict(
model,
trainloader,
feature_extract=True,
max_size=len(trainset)
)
print(features.shape, labels.shape, images.shape)
labels = [classes[labels[j]] for j in range(labels.shape[0])]
(40000, 84) (40000,) (40000, 128, 128, 3)
umap_2d = umap.umap_.UMAP(n_components=2, random_state=0)
umap_2d.fit(features)
UMAP(dens_frac=0.0, dens_lambda=0.0, random_state=0)
projections_umap = umap_2d.transform(features)
Visualisation avec plotly¶
fig = px.scatter(
projections_umap, x=0, y=1,
color=labels
)
fig.show()
N’hésitez pas à déselectionner en cliquant sur le label associé ou à ne sélectionner qu’une seule catégorie en double cliquant !
Question
Que dire de ce feature space. Permet-il d’expliquer les performances de votre modèle ? Pourquoi ?
Visualisation interactive avec Bokeh¶
En réalité, chaque point de notre feature space est l’image d’un \(x\in\mathbb{R}^d\) par la fonction \(\phi\). Il est particulièrement intéressant d’essayer de visualiser les \(x\) qui ont permis de produire chacun des points. Cela nous permettra de constater les proximités et/ou différences entre les points en fonction de leur proximité/distance.
[•] Méthode de visualisation du feature space¶
from io import BytesIO
from PIL import Image
import base64
import pandas as pd
from bokeh.plotting import figure, show, output_notebook
from bokeh.models import HoverTool, ColumnDataSource, CategoricalColorMapper
from bokeh.palettes import Spectral10, Viridis256, Category20, Turbo256, mpl
import itertools
def plot_feature_space_with_images(classes, images, labels):
output_notebook()
colors = itertools.cycle(Category20[20])
pal = [color for m, color in zip(range(len(classes)), colors)]
np.random.shuffle(pal)
def embeddable_image(data):
image = Image.fromarray(data, mode='RGB')
buffer = BytesIO()
image.save(buffer, format='jpeg')
for_encoding = buffer.getvalue()
return 'data:image/jpeg;base64,' + base64.b64encode(for_encoding).decode()
max_size = 2000
data_df = pd.DataFrame(projections_umap[:max_size], columns=('x', 'y'))
data_df['class'] = [x for x in labels][:max_size]
data_df['image'] = list(map(embeddable_image, images[:max_size]))
datasource = ColumnDataSource(data_df)
color_mapping = CategoricalColorMapper(factors=classes,
palette=pal)
plot_figure = figure(
title='UMAP projection of the dataset',
plot_width=900,
plot_height=600,
tools=('pan, wheel_zoom, reset')
)
plot_figure.add_tools(HoverTool(tooltips="""
<div>
<div>
<img src='@image' style='float: left; margin: 5px 5px 5px 5px'/>
</div>
<div>
<span style='font-size: 16px; color: #224499'>Classe:</span>
<span style='font-size: 18px'>@class</span>
</div>
</div>
"""))
plot_figure.circle(
'x',
'y',
source=datasource,
color=dict(field='class', transform=color_mapping),
line_alpha=0.6,
fill_alpha=0.6,
size=10,
legend_field="class",
)
plot_figure.legend.location = "top_left"
#plot_figure.legend.click_policy="mute"
plot_figure.legend.label_text_font_size = "8px"
show(plot_figure)
Visualisation¶
IX. On recommence avec les Simpsons !¶
Attention, afin de ne pas tout recoder, pensez à exécuter les cellules des sections dont le titre est au format [•] Méthode de … qui contiennent du code réutilisable !
Exercice
Répondez à toutes les questions précédentes dans le cadre de ce nouveau jeu de données et de ce nouveau modèle !
A. Construction du jeu de données¶
# Pour Google Colaboratory
# Décommenter les lignes suivantes
# import os
# from google.colab import drive
# drive.mount('/content/drive')
# dataset_path = '/content/drive/MyDrive/DeepTP/archive/simpsons_dataset'
# dataset_path_test = '/content/drive/MyDrive/DeepTP/archive/kaggle_simpson_testset'
# Chemin local vers le dataset
dataset_path = './data/Simpsons/simpsons_dataset'
dataset_path_test ='./data/Simpsons/kaggle_simpson_testset'
data_transform = transforms.Compose([
transforms.Resize((224, 224)),
# RandomSizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])
dataset_train = datasets.ImageFolder(root=dataset_path, transform=data_transform)
trainset, validset, train_sampler, valid_sampler = split_dataset(
dataset_train, valid_size = 0.2, random_state=None
)
trainloader = DataLoader(
trainset, batch_size=batch_size, sampler=train_sampler,
num_workers=num_workers,
)
validloader = DataLoader(
validset, batch_size=batch_size, sampler=valid_sampler,
num_workers=num_workers,
)
print('Number of batches in train/val:', len(trainloader), len(validloader))
# Get the test data from the test directory
dataset_test = datasets.ImageFolder(root=dataset_path_test,
transform=data_transform)
# We don't need to split train val, all test data are in one folder
testloader = DataLoader(
dataset_test, batch_size=batch_size, shuffle=True,
num_workers=num_workers,
)
print('Number of batches in test:', len(testloader))
# We list all the directories in alphabetical order to have the label classes.
classes = [c for c in sorted(os.listdir(dataset_path))]
print('Classes :\n\t- ' + '\n\t- '.join(classes))
B. Visualisation de quelques images¶
dataiter = iter(testloader)
images, labels = dataiter.next()
imshow(images[:8], labels[:8])
C. Construction du modèle¶
Dans certains cas, nous ne voulons apprendre que le classifieur final en espérant que l’espace de représentation appris nous permettra de résoudre notre tâche.
print("Loading existing architecture and init parameters of model pretrained on ImageNet...")
model = models.resnet18(pretrained=True)
finetuning = True
if finetuning:
for p in model.parameters():
p.requires_grad = False
model.fc = nn.Linear(model.fc.in_features, len(classes))
# model = model.cuda()
D. Visualisation des filtres¶
filters = model.conv1.weight.data.clone().cpu()
visualize_filters(filters, ch=0, allkernels=False)
E. L’apprentissage¶
Fonction objectif, scheduler et optimizer¶
#Choose the loss function
criterion = nn.CrossEntropyLoss()
#Optimizer
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=0.0005)
scheduler = MultiStepLR(optimizer, milestones=[25, 50], gamma=0.1)
L’entraînement¶
eval_frequency=1
loss_history, \
valid_loss_history, \
acc_history, \
val_acc_history = train(model, criterion, optimizer, scheduler, n_epoch=2)
Visualisation des courbes de loss et de précision¶
plot_loss(
loss_history, valid_loss_history, acc_history, val_acc_history
)
Sauvegarde et chargement du modèle¶
torch.save(model.state_dict(), 'my_model.torch')
model = Net()
model.load_state_dict(torch.load('my_model.torch'))
# model = model.cuda()
F. Visualisation des filtres/paramètres appris¶
Attention, si le modèle a été finetuné, les filtres n’ont pas été modifiés et sont donc les mêmes qu’au départ.
filters = model.conv1.weight.data.clone().cpu()
visualize_filters(filters, ch=0, allkernels=False)
G. Évaluation du modèle et test de quelques prédictions¶
accuracy, _ = evaluate(testloader, model)
print('Test accuracy: %.3f' % (accuracy), end='\n')
#Test prediction on some images
dataiter = iter(testloader)
images, labels = dataiter.next()
outputs = model(images[:8])# .to(device)) # we use the loaded model
_, predicted = torch.max(outputs, 1)
imshow(images[:8], labels[:8], predicted[:8])
H. Extraction de features et Dataviz¶
#Extract feature vectors:
features, labels, images = predict(
model,
trainloader,
feature_extract=True,
max_size=len(trainset)
)
print(features.shape, labels.shape, images.shape)
labels = [classes[labels[j]] for j in range(labels.shape[0])]
umap_2d = umap.umap_.UMAP(n_components=2, random_state=0)
umap_2d.fit(features)
projections_umap = umap_2d.transform(features)
Visualisation avec plotly¶
fig = px.scatter(#_3d(
projections_umap, x=0, y=1, # z=2,
color=labels
)
fig.show()
Visualisation interactive avec Bokeh¶
plot_feature_space_with_images(classes, images, labels)